package com.gene.tlonpo.listener;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gene.tlonpo.config.LonpoStaticContext;
import com.gene.tlonpo.exception.TlonpoException;
import com.gene.tlonpo.guard.GuardDistributedInfo;
import com.gene.tlonpo.guard.tool.GuardAggregationTool;
import com.gene.tlonpo.manager.type.GuardManageType;
import com.gene.tlonpo.service.handler.BusinessHandler;
import com.gene.tlonpo.service.handler.TlonpoBusinessHandler;
import com.gene.tlonpo.service.remote.common.model.DataDto;
import com.gene.tlonpo.util.IPUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Calendar;
import java.util.List;

/**
 * @author geneX
 * 事件消息监听
 * 当轮询关心的事件发生的时候，事件触发方需要发送一条消息
 * 此监听的目的：通过监听的消息内容匹配服务本地和远程是否有关心此事件的轮询，有就callback
 * 消息一律不重复消费
 */
@Slf4j
public class EventMqListener implements MessageListenerConcurrently {

    private static BusinessHandler tlonpoBusinessHandler = TlonpoBusinessHandler.tlonpoBusinessHandler;

    private static final String BIZ_KEY = "bizKey";

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        list.forEach(messageExt -> {
            try {
                JSONObject jsonObject = JSON.parseObject(new String(messageExt.getBody()));

                //取出bizkey
                String bizKey = String.valueOf(jsonObject.get(BIZ_KEY));

                //匹配本地轮询
                tryLocalCallback(bizKey);

                //远程
                tryRemoteCallback(bizKey);
            } catch (Exception te) {
                //消息不重复消费
                log.error("tlonpo mq message consumer fail,not retry,msgId:{}", messageExt.getMsgId(), te);
            }
        });

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private void tryRemoteCallback(String bizKey) {
        //查询分布式的guard信息
        List<GuardDistributedInfo> guardList = GuardAggregationTool.getGuard(bizKey, GuardManageType.DISTRIBUTED);

        if (CollectionUtils.isEmpty(guardList)) {
            log.info("EventMqListener tryRemoteCallback,there is no distribute guards");
            return;
        }

        //远程访问
        guardList.forEach(guard -> {
            //本地ip则不处理
            if (IPUtil.isLocalAddress(guard.getLocalIp(), guard.getPort())) {
                //删除分布式的guard
                GuardAggregationTool.removeGuard(guard, GuardManageType.DISTRIBUTED);
                return;
            }

            //远程ip则处理,调用远程
            doRemoteCallback(guard);
        });
    }

    private void doRemoteCallback(GuardDistributedInfo guard) {
        tlonpoBusinessHandler.remoteCallback(new DataDto(guard));
    }

    private void tryLocalCallback(String bizKey) {
        tlonpoBusinessHandler.localCallback(bizKey);
    }
}
